/*
 * Copyright 2025 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package example.springdata.redis.sync;

import static org.assertj.core.api.Assertions.*;
import static org.springframework.data.redis.connection.stream.StreamOffset.*;

import example.springdata.redis.SensorData;

import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.data.redis.test.autoconfigure.DataRedisTest;
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
import org.springframework.data.redis.RedisSystemException;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.StreamOperations;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;

import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

import com.redis.testcontainers.RedisContainer;

/**
 * @author Christoph Strobl
 * @author Mark Paluch
 */
@Testcontainers
@DataRedisTest
class SyncStreamApiTests {

	@Container
	@ServiceConnection
	static RedisContainer redis = new RedisContainer(DockerImageName.parse("redis:7"));

	@Autowired StringRedisTemplate template;
	@Autowired StreamMessageListenerContainer<String, MapRecord<String, String, String>> messageListenerContainer;

	private StreamOperations<String, String, String> streamOps;

	@BeforeEach
	void setUp() {

		// clear all
		template.getConnectionFactory().getConnection().flushAll();

		streamOps = template.opsForStream();
	}

	@Test
	void basics() {

		// XADD with fixed id
		var fixedId1 = streamOps.add(SensorData.RECORD_1234_0);
		assertThat(fixedId1).isEqualTo(SensorData.RECORD_1234_0.getId());

		var fixedId2 = streamOps.add(SensorData.RECORD_1234_1);
		assertThat(fixedId2).isEqualTo(SensorData.RECORD_1234_1.getId());

		// XLEN
		assertThat(streamOps.size(SensorData.KEY)).isEqualTo(2L);

		// XADD errors when timestamp is less than the last inserted
		assertThatExceptionOfType(RedisSystemException.class).isThrownBy(() -> {
			streamOps.add(SensorData.create("1234", "19.8", "invalid").withId(RecordId.of("0-0")));
		});

		// XADD with autogenerated id
		var autogeneratedId = streamOps.add(SensorData.create("1234", "19.8", null));

		assertThat(autogeneratedId.getValue()).endsWith("-0");
		assertThat(streamOps.size(SensorData.KEY)).isEqualTo(3L);

		// XREAD from start
		var fromStart = streamOps.read(fromStart(SensorData.KEY));
		assertThat(fromStart).hasSize(3).extracting(MapRecord::getId).containsExactly(fixedId1, fixedId2, autogeneratedId);

		// XREAD resume after
		var fromOffset = streamOps.read(StreamOffset.create(SensorData.KEY, ReadOffset.from(fixedId2)));
		assertThat(fromOffset).hasSize(1).extracting(MapRecord::getId).containsExactly(autogeneratedId);
	}

	@Test
	void continuousRead() throws InterruptedException {

		// container autostart is disabled by default
		if (!messageListenerContainer.isRunning()) {
			messageListenerContainer.start();
		}

		var streamListener = CapturingStreamListener.create();

		// XREAD BLOCK
		messageListenerContainer.receive(fromStart(SensorData.KEY), streamListener);

		TimeUnit.MILLISECONDS.sleep(100);

		assertThat(streamListener.recordsReceived()).isEqualTo(0);

		streamOps.add(SensorData.RECORD_1234_0);
		streamOps.add(SensorData.RECORD_1234_1);

		assertThat(streamListener.take().getId()).isEqualTo(SensorData.RECORD_1234_0.getId());
		assertThat(streamListener.take().getId()).isEqualTo(SensorData.RECORD_1234_1.getId());
		assertThat(streamListener.recordsReceived()).isEqualTo(2);

		streamOps.add(SensorData.RECORD_1235_0);

		assertThat(streamListener.take().getId()).isEqualTo(SensorData.RECORD_1235_0.getId());
		assertThat(streamListener.recordsReceived()).isEqualTo(3);
	}
}
